#functions to set up spark data table variable names and define financial distress measures

# Load packages
import calendar
import pandas as pd
import numpy as np
import os
import sys
import shutil


########################################
### Harcoode of table names
########################################
tabfn = ['headers', 'customs', 'trades', 'collections', 'pubrecs']
tabFn = ['Header', 'Custom', 'Trade', 'Collection', 'PubRec']

BASE='/data/'



########################################
### Functions to extract variables
########################################

### load public record
def table_setup(sqlContext, y, m, table, top=0):
    # top: select top rows

    # select top rows
    if top>0:
        lmt = ' limit ' + str(top)
    else:
        lmt = ''


    if table in tabfn:
        n = tabfn.index(table)
        # table name
        tabName = BASE + str(y) + '/' + table + '/dgebooth_' + tabFn[n] + sql_date(y,m) + '*'
        # specify table 
        df = sqlContext.sql("SELECT * FROM parquet.`" + tabName + "` " + lmt)

        #specify variable headers
        if table=='headers': 
            if y>2008:
                #09-15 data:
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field2", "sequenceNumber"
                ).withColumnRenamed("field3", "inFileSinceDate"
                ).withColumnRenamed("field4", "deceasedDate"
                ).withColumnRenamed("field5", "state"
                ).withColumnRenamed("field6", "zip"
                ).withColumnRenamed("field7", "birthDate"
                ).withColumnRenamed("field8", "akaCounter"
                ).withColumnRenamed("field9", "akaManualCounter"
                ).withColumnRenamed("subjectkey", "subjectKey"
                ).withColumnRenamed("field11", "asOfDate"
                )
            else:
                #00-08 data:
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field2", "matchType"                 
                ).withColumnRenamed("field3", "sequenceNumber"
                ).withColumnRenamed("field4", "inFileSinceDate"
                ).withColumnRenamed("field5", "addressDiscrepancyFlag"
                ).withColumnRenamed("subjectkey", "subjectKey"
                ).withColumnRenamed("field7", "asOfDate"
                )

        elif table=='customs':
            if y>2008:
                #09-15 data:
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field1", "sequenceNumber"
                ).withColumnRenamed("field2", "sequenceNumber"
                ).withColumnRenamed("field3", "filler1"
                ).withColumnRenamed("field4", "filler2"
                ).withColumnRenamed("field5", "score"
                ).withColumnRenamed("subjectkey", "subjectKey"
                ).withColumnRenamed("field7", "asOfDate"
                ).withColumnRenamed("field8", "at35a"
                ).withColumnRenamed("field9", "at35b"
                ).withColumnRenamed("field10", "s062s"
                ).withColumnRenamed("field11", "g220a"
                ).withColumnRenamed("field12", "g219a"
                ).withColumnRenamed("field13", "at21s"
                ).withColumnRenamed("field14", "bc21s"
                ).withColumnRenamed("field15", "bc36s"
                ).withColumnRenamed("field16", "g102s"
                ).withColumnRenamed("field17", "re21s"
                ).withColumnRenamed("field18", "re36s"
                ).withColumnRenamed("field19", "in20s"
                ).withColumnRenamed("field20", "in33s"
                ).withColumnRenamed("field21", "re20s"
                ).withColumnRenamed("field22", "at36s"
                ).withColumnRenamed("field23", "g071s"
                ).withColumnRenamed("field24", "g221a"
                ).withColumnRenamed("field25", "g218a"
                ).withColumnRenamed("field26", "cv12"
                ).withColumnRenamed("field27", "at09s"
                ).withColumnRenamed("field28", "bc98a"
                ).withColumnRenamed("field29", "bc_trd"
                ).withColumnRenamed("field30", "co02s"
                ).withColumnRenamed("field31", "br02s"
                ).withColumnRenamed("field32", "rev_trd"
                ).withColumnRenamed("field33", "numPubRec"
                ).withColumnRenamed("field34", "inst_trd"
                ).withColumnRenamed("field35", "mt01s"
                ).withColumnRenamed("field36", "g215b"
                ).withColumnRenamed("field37", "trd"
                ).withColumnRenamed("field38", "re02s"
                ).withColumnRenamed("field39", "numPubRecBkrt"
                ).withColumnRenamed("field40", "g100s"
                ).withColumnRenamed("field41", "re01s"
                ).withColumnRenamed("field42", "re29s"
                ).withColumnRenamed("field43", "at03s"
                ).withColumnRenamed("field44", "bc03s"
                ).withColumnRenamed("field45", "s208s"
                ).withColumnRenamed("field46", "re09s"
                ).withColumnRenamed("field47", "g051s"
                ).withColumnRenamed("field48", "br31s"
                ).withColumnRenamed("field49", "rvlr01"
                ).withColumnRenamed("field50", "re34s"
                ).withColumnRenamed("field51", "g061s"
                ).withColumnRenamed("field52", "g237s"
                ).withColumnRenamed("field53", "at36s2"
                ).withColumnRenamed("field54", "g095s"
                ).withColumnRenamed("field55", "at28b"
                ).withColumnRenamed("field56", "g217s"
                ).withColumnRenamed("field57", "s207s"
                ).withColumnRenamed("field58", "s207a"
                ).withColumnRenamed("field59", "bc28s"
                ).withColumnRenamed("field60", "g215a"
                ).withColumnRenamed("field61", "at33b"
                ).withColumnRenamed("field62", "re33s"
                ).withColumnRenamed("field63", "at33a"
                ).withColumnRenamed("field64", "at28a"
                ).withColumnRenamed("field65", "in28s"
                ).withColumnRenamed("field66", "re28s"
                ).withColumnRenamed("field67", "areaLatitude"
                ).withColumnRenamed("field68", "areaLongitude"
                ).withColumnRenamed("field69", "stateNum"
                ).withColumnRenamed("field70", "blckGeoCode"
                ).withColumnRenamed("field71", "msaNew"
                ).withColumnRenamed("field72", "dma"
                ).withColumnRenamed("field73", "censGeoCode"
                ).withColumnRenamed("field74", "filler3"
                ).withColumnRenamed("field75", "state"
                ).withColumnRenamed("field76", "zipcode"
                ).withColumnRenamed("field77", "filler4"
                ).withColumnRenamed("field78", "filler5"
                ).withColumnRenamed("field79", "filler6"
                ).withColumnRenamed("field80", "filler7"
                ).withColumnRenamed("field81", "filler8"
                ).withColumnRenamed("field82", "filler9"
                ).withColumnRenamed("field83", "filler10"
                ).withColumnRenamed("field84", "filler11"
                ).withColumnRenamed("field85", "filler12"
                ).withColumnRenamed("field86", "filler13"
                ).withColumnRenamed("field87", "filler14"
                ).withColumnRenamed("field88", "filler15"
                ).withColumnRenamed("field89", "filler16"
                ).withColumnRenamed("field90", "filler17"
                ).withColumnRenamed("field91", "filler18"
                ).withColumnRenamed("field92", "filler19"
                ).withColumnRenamed("field93", "filler20"
                ).withColumnRenamed("field94", "filler21"
                ).withColumnRenamed("field95", "filler22"
                ).withColumnRenamed("field96", "filler23"
                ).withColumnRenamed("field97", "filler24"
                ).withColumnRenamed("field98", "filler25"
                ).withColumnRenamed("field99", "filler26"
                ).withColumnRenamed("field100", "filler27"
                ).withColumnRenamed("field101", "filler28"
                ).withColumnRenamed("field102", "filler29"
                ).withColumnRenamed("field103", "filler30"
                ).withColumnRenamed("field104", "filler31"
                ).withColumnRenamed("field105", "filler32"
                ).withColumnRenamed("field106", "filler33"
                ).withColumnRenamed("field107", "filler34"
                ).withColumnRenamed("field108", "filler35"
                ).withColumnRenamed("field109", "filler36"
                ).withColumnRenamed("field110", "filler37"
                ).withColumnRenamed("field111", "filler38"
                ).withColumnRenamed("field112", "filler39"
                ).withColumnRenamed("field113", "filler40"
                ).withColumnRenamed("field114", "filler41"
                ).withColumnRenamed("field115", "filler42"
                ).withColumnRenamed("field116", "filler43"
                ).withColumnRenamed("field117", "filler44"
                ).withColumnRenamed("field118", "filler45"
                ).withColumnRenamed("field119", "filler46"
                ).withColumnRenamed("field120", "filler47"
                ).withColumnRenamed("field121", "filler48"
                ).withColumnRenamed("field122", "filler49"
                ).withColumnRenamed("field123", "filler50"
                ).withColumnRenamed("field124", "filler51"
                ).withColumnRenamed("field125", "filler52"
                ).withColumnRenamed("field126", "filler53"
                ).withColumnRenamed("field127", "filler54"
                ).withColumnRenamed("field128", "filler55"
                ).withColumnRenamed("field129", "filler56"
                ).withColumnRenamed("field130", "filler57"
                ).withColumnRenamed("field131", "filler58"
                ).withColumnRenamed("field132", "filler59"
                ).withColumnRenamed("field133", "at02s"
                ).withColumnRenamed("field134", "at06s"
                ).withColumnRenamed("field135", "at20s"
                ).withColumnRenamed("field136", "at30s"
                ).withColumnRenamed("field137", "at31s"
                ).withColumnRenamed("field138", "at34b"
                ).withColumnRenamed("field139", "at57s"
                ).withColumnRenamed("field140", "at103s"
                ).withColumnRenamed("field141", "at104s"
                ).withColumnRenamed("field142", "au02s"
                ).withColumnRenamed("field143", "au03s"
                ).withColumnRenamed("field144", "au09s"
                ).withColumnRenamed("field145", "au21s"
                ).withColumnRenamed("field146", "au33s"
                ).withColumnRenamed("field147", "au34s"
                ).withColumnRenamed("field148", "au36s"
                ).withColumnRenamed("field149", "au51a"
                ).withColumnRenamed("field150", "bc01s"
                ).withColumnRenamed("field151", "bc02s"
                ).withColumnRenamed("field152", "bc09s"
                ).withColumnRenamed("field153", "bc20s"
                ).withColumnRenamed("field154", "bc27s"
                ).withColumnRenamed("field155", "bc31s"
                ).withColumnRenamed("field156", "bc33s"
                ).withColumnRenamed("field157", "bc34s"
                ).withColumnRenamed("field158", "bc57s"
                ).withColumnRenamed("field159", "bc97a"
                ).withColumnRenamed("field160", "bc102s"
                ).withColumnRenamed("field161", "bc104s"
                ).withColumnRenamed("field162", "bc106s"
                ).withColumnRenamed("field163", "bc107s"
                ).withColumnRenamed("field164", "bc108s"
                ).withColumnRenamed("field165", "bi02s"
                ).withColumnRenamed("field166", "bi09s"
                ).withColumnRenamed("field167", "bi28s"
                ).withColumnRenamed("field168", "bi34s"
                ).withColumnRenamed("field169", "bi36s"
                ).withColumnRenamed("field170", "br20s"
                ).withColumnRenamed("field171", "br21s"
                ).withColumnRenamed("field172", "br109s"
                ).withColumnRenamed("field173", "co03s"
                ).withColumnRenamed("field174", "co04s"
                ).withColumnRenamed("field175", "co05s"
                ).withColumnRenamed("field176", "co07s"
                ).withColumnRenamed("field177", "fc04s"
                ).withColumnRenamed("field178", "fi02s"
                ).withColumnRenamed("field179", "fi09s"
                ).withColumnRenamed("field180", "fi33s"
                ).withColumnRenamed("field181", "fi34s"
                ).withColumnRenamed("field182", "fr02s"
                ).withColumnRenamed("field183", "fr33s"
                ).withColumnRenamed("field184", "fr34s"
                ).withColumnRenamed("field185", "g001s"
                ).withColumnRenamed("field186", "g001b"
                ).withColumnRenamed("field187", "g002s"
                ).withColumnRenamed("field188", "g002b"
                ).withColumnRenamed("field189", "g003s"
                ).withColumnRenamed("field190", "g020s"
                ).withColumnRenamed("field191", "g041s"
                ).withColumnRenamed("field192", "g042s"
                ).withColumnRenamed("field193", "g043s"
                ).withColumnRenamed("field194", "g057s"
                ).withColumnRenamed("field195", "g058s"
                ).withColumnRenamed("field196", "g063s"
                ).withColumnRenamed("field197", "g066s"
                ).withColumnRenamed("field198", "g068s"
                ).withColumnRenamed("field199", "g103s"
                ).withColumnRenamed("field200", "g200s"
                ).withColumnRenamed("field201", "g201a"
                ).withColumnRenamed("field202", "g201b"
                ).withColumnRenamed("field203", "g209s"
                ).withColumnRenamed("field204", "g213b"
                ).withColumnRenamed("field205", "g218c"
                ).withColumnRenamed("field206", "g220b"
                ).withColumnRenamed("field207", "g221b"
                ).withColumnRenamed("field208", "g222s"
                ).withColumnRenamed("field209", "g223s"
                ).withColumnRenamed("field210", "g224a"
                ).withColumnRenamed("field211", "g224b"
                ).withColumnRenamed("field212", "g230s"
                ).withColumnRenamed("field213", "g236s"
                ).withColumnRenamed("field214", "g238s"
                ).withColumnRenamed("field215", "g251c"
                ).withColumnRenamed("field216", "g305s"
                ).withColumnRenamed("field217", "g306s"
                ).withColumnRenamed("field218", "g309s"
                ).withColumnRenamed("field219", "g960s"
                ).withColumnRenamed("field220", "g990s"
                ).withColumnRenamed("field221", "hi02s"
                ).withColumnRenamed("field222", "hi03s"
                ).withColumnRenamed("field223", "hi33s"
                ).withColumnRenamed("field224", "hi34s"
                ).withColumnRenamed("field225", "hi57s"
                ).withColumnRenamed("field226", "hr02s"
                ).withColumnRenamed("field227", "hr03s"
                ).withColumnRenamed("field228", "hr33s"
                ).withColumnRenamed("field229", "hr34s"
                ).withColumnRenamed("field230", "hr57s"
                ).withColumnRenamed("field231", "in02s"
                ).withColumnRenamed("field232", "in06s"
                ).withColumnRenamed("field233", "in21s"
                ).withColumnRenamed("field234", "in34s"
                ).withColumnRenamed("field235", "lm01s"
                ).withColumnRenamed("field236", "lm02s"
                ).withColumnRenamed("field237", "lm03s"
                ).withColumnRenamed("field238", "lm04s"
                ).withColumnRenamed("field239", "lm06s"
                ).withColumnRenamed("field240", "mt02s"
                ).withColumnRenamed("field241", "mt03s"
                ).withColumnRenamed("field242", "mt20s"
                ).withColumnRenamed("field243", "mt21s"
                ).withColumnRenamed("field244", "mt33s"
                ).withColumnRenamed("field245", "mt34s"
                ).withColumnRenamed("field246", "mt36s"
                ).withColumnRenamed("field247", "mt57s"
                ).withColumnRenamed("field248", "re30s"
                ).withColumnRenamed("field249", "rp03s"
                ).withColumnRenamed("field250", "rp04s"
                ).withColumnRenamed("field251", "rp05s"
                ).withColumnRenamed("field252", "s004s"
                ).withColumnRenamed("field253", "s043s"
                ).withColumnRenamed("field254", "s061s"
                ).withColumnRenamed("field255", "s064a"
                ).withColumnRenamed("field256", "s064b"
                ).withColumnRenamed("field257", "s068a"
                ).withColumnRenamed("field258", "s068b"
                ).withColumnRenamed("field259", "s071a"
                ).withColumnRenamed("field260", "s071b"
                ).withColumnRenamed("field261", "s073a"
                ).withColumnRenamed("field262", "s073b"
                ).withColumnRenamed("field263", "s114s"
                ).withColumnRenamed("field264", "st02s"
                ).withColumnRenamed("field265", "st03s"
                ).withColumnRenamed("field266", "st05s"
                ).withColumnRenamed("field267", "st20s"
                ).withColumnRenamed("field268", "st28s"
                ).withColumnRenamed("field269", "st33s"
                ).withColumnRenamed("field270", "st57s"
                ).withColumnRenamed("field271", "atap01"
                ).withColumnRenamed("field272", "hiap01 "
                ).withColumnRenamed("field273", "hrap01"
                ).withColumnRenamed("field274", "mtap01"
                ).withColumnRenamed("field275", "reap01"
                ).withColumnRenamed("field276", "s063a"
                ).withColumnRenamed("field277", "g106s"
                ).withColumnRenamed("field278", "st102s"
                ).withColumnRenamed("field279", "st103s"
                ).withColumnRenamed("field280", "filler60"
                ).withColumnRenamed("field281", "filler61"
                ).withColumnRenamed("field282", "filler62"
                ).withColumnRenamed("field283", "filler63"
                ).withColumnRenamed("field284", "filler64"
                ).withColumnRenamed("field285", "filler65"
                ).withColumnRenamed("field286", "filler66"
                ).withColumnRenamed("field287", "filler67"
                ).withColumnRenamed("field288", "filler68"
                ).withColumnRenamed("field289", "filler69"
                ).withColumnRenamed("field290", "filler70"
                ).withColumnRenamed("field291", "filler71"
                ).withColumnRenamed("field292", "filler72"
                ).withColumnRenamed("field293", "filler73"
                ).withColumnRenamed("field294", "filler74"
                ).withColumnRenamed("field295", "filler75"
                ).withColumnRenamed("field296", "filler76"
                ).withColumnRenamed("field297", "filler77"
                ).withColumnRenamed("field298", "filler78"
                ).withColumnRenamed("field299", "filler79"
                ).withColumnRenamed("field300", "filler80"
                ).withColumnRenamed("field301", "filler81"
                ).withColumnRenamed("field302", "filler82"
                ).withColumnRenamed("field303", "filler83"
                ).withColumnRenamed("field304", "filler84"
                ).withColumnRenamed("field305", "filler85"
                ).withColumnRenamed("field306", "filler86"
                ).withColumnRenamed("field307", "filler87"
                ).withColumnRenamed("field308", "filler88"
                ).withColumnRenamed("field309", "filler89"
                ).withColumnRenamed("field310", "filler90"
                ).withColumnRenamed("field311", "filler91"
                ).withColumnRenamed("field312", "filler92"
                ).withColumnRenamed("field313", "filler93"
                ).withColumnRenamed("field314", "filler94"
                ).withColumnRenamed("field315", "filler95"
                ).withColumnRenamed("field316", "filler96"
                ).withColumnRenamed("field317", "filler97"
                ).withColumnRenamed("field318", "filler98"
                ).withColumnRenamed("field319", "filler99"
                ).withColumnRenamed("field320", "filler100"
                ).withColumnRenamed("field321", "filler101"
                ).withColumnRenamed("field322", "filler102"
                ).withColumnRenamed("field323", "filler103"
                ).withColumnRenamed("field324", "filler104"
                ).withColumnRenamed("field325", "filler105"
                ).withColumnRenamed("field326", "filler106"
                ).withColumnRenamed("field327", "filler107"
                ).withColumnRenamed("field328", "filler108"
                ).withColumnRenamed("field329", "filler109"
                ).withColumnRenamed("field330", "filler110"
                ).withColumnRenamed("field331", "filler111"
                ).withColumnRenamed("field332", "filler112"
                ).withColumnRenamed("field333", "filler113"
                ).withColumnRenamed("field334", "filler114"
                ).withColumnRenamed("field335", "filler115"
                ).withColumnRenamed("field336", "filler116"
                ).withColumnRenamed("field337", "filler117"
                ).withColumnRenamed("field338", "filler118"
                ).withColumnRenamed("field339", "rt27s"
                ).withColumnRenamed("field340", "rt30s"
                ).withColumnRenamed("field341", "rt34s"
                ).withColumnRenamed("field342", "rt101s"
                ).withColumnRenamed("field343", "filler119"
                )
            else:
                #00-08 data:
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field2", "filter1"
                ).withColumnRenamed("field3", "sequenceNumber"
                ).withColumnRenamed("field4", "filter2"
                ).withColumnRenamed("field5", "filter3"
                ).withColumnRenamed("field6", "score"
                ).withColumnRenamed("subjectKey", "subjectKey"
                ).withColumnRenamed("field8", "asOfDate"
                ).withColumnRenamed("field9", "scrat35"
                ).withColumnRenamed("field10", "scrs062"
                ).withColumnRenamed("field11", "cur4in12"
                ).withColumnRenamed("field12", "cur3in12"
                ).withColumnRenamed("field13", "scrat21"
                ).withColumnRenamed("field14", "scrbc21"
                ).withColumnRenamed("field15", "scrbc36"
                ).withColumnRenamed("field16", "scrg102"
                ).withColumnRenamed("field17", "re21"
                ).withColumnRenamed("field18", "re36"
                ).withColumnRenamed("field19", "in20"
                ).withColumnRenamed("field20", "scrin33"
                ).withColumnRenamed("field21", "scrre20"
                ).withColumnRenamed("field22", "scrg071"
                ).withColumnRenamed("field23", "cur5in12"
                ).withColumnRenamed("field24", "cur2in12"
                ).withColumnRenamed("field25", "scrg043"
                ).withColumnRenamed("field26", "scrat07"
                ).withColumnRenamed("field27", "scrat09"
                ).withColumnRenamed("field28", "scrbc98"
                ).withColumnRenamed("field29", "scrbc01"
                ).withColumnRenamed("field30", "chgoff12"
                ).withColumnRenamed("field31", "s069"
                ).withColumnRenamed("field32", "scrs012"
                ).withColumnRenamed("field33", "numPubRec"
                ).withColumnRenamed("field34", "in01"
                ).withColumnRenamed("field35", "scrmt01"
                ).withColumnRenamed("field36", "colexmed"
                ).withColumnRenamed("field37", "scrat01"
                ).withColumnRenamed("field38", "scrs012_2"
                ).withColumnRenamed("field39", "numPubRecBkrt"
                ).withColumnRenamed("field40", "re01"
                ).withColumnRenamed("field41", "re29"
                ).withColumnRenamed("field42", "scrat03"
                ).withColumnRenamed("field43", "scrbc03"
                ).withColumnRenamed("field44", "scrs065"
                ).withColumnRenamed("field45", "re09"
                ).withColumnRenamed("field46", "scrg051"
                ).withColumnRenamed("field47", "bk31"
                ).withColumnRenamed("field48", "br34"
                ).withColumnRenamed("field49", "scrre34"
                ).withColumnRenamed("field50", "scrg061"
                ).withColumnRenamed("field51", "g980"
                ).withColumnRenamed("field52", "scrat36"
                ).withColumnRenamed("field53", "scrg095"
                ).withColumnRenamed("field54", "scrat28"
                ).withColumnRenamed("field55", "scrg091"
                ).withColumnRenamed("field56", "bc28"
                ).withColumnRenamed("field57", "scrs064"
                ).withColumnRenamed("field58", "scrat99"
                ).withColumnRenamed("field59", "scrre33"
                ).withColumnRenamed("field60", "scrat33"
                ).withColumnRenamed("field61", "scrin28"
                ).withColumnRenamed("field62", "scrre28"
                ).withColumnRenamed("field63", "areaLatitude"
                ).withColumnRenamed("field64", "areaLongitude"
                ).withColumnRenamed("field65", "stateNum"
                ).withColumnRenamed("field66", "blckGeoCode"
                ).withColumnRenamed("field67", "msaNew"
                ).withColumnRenamed("field68", "dma"
                ).withColumnRenamed("field69", "censGeoCode"
                ).withColumnRenamed("field70", "filter4"
                ).withColumnRenamed("field71", "state"
                ).withColumnRenamed("field72", "zipcode"
                )
        elif table=='trades':
            if y>2008:
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field2", "sequenceNumber"
                ).withColumnRenamed("field3", "accountIdentifier"
                ).withColumnRenamed("field4", "industryCode"
                ).withColumnRenamed("field5", "filler1"
                ).withColumnRenamed("field6", "filler2"
                ).withColumnRenamed("field7", "filler3"
                ).withColumnRenamed("field8", "filler4"
                ).withColumnRenamed("field9", "ecoaCode"
                ).withColumnRenamed("field10", "openDate"
                ).withColumnRenamed("field11", "endDate"
                ).withColumnRenamed("field12", "effectiveDate"
                ).withColumnRenamed("field13", "paidOutDate"
                ).withColumnRenamed("field14", "closedDate"
                ).withColumnRenamed("field15", "lastPaymentDate"
                ).withColumnRenamed("field16", "currentBalanceAmount"
                ).withColumnRenamed("field17", "highCreditAmount"
                ).withColumnRenamed("field18", "creditLimitAmount"
                ).withColumnRenamed("field19", "originalChargeOffAmount"
                ).withColumnRenamed("field20", "paymentAmount"
                ).withColumnRenamed("field21", "pastDueAmount"
                ).withColumnRenamed("field22", "paymentDueAmount"
                ).withColumnRenamed("field23", "paymentScheduleMonthCount"
                ).withColumnRenamed("field24", "frequencyOfPayment"
                ).withColumnRenamed("field25", "affiliateRemarkCode"
                ).withColumnRenamed("field26", "affiliateRemarkCodeDate"
                ).withColumnRenamed("field27", "complianceRemarkCode"
                ).withColumnRenamed("field28", "complianceRemarkCodeDate"
                ).withColumnRenamed("field29", "genericRemarkCode"
                ).withColumnRenamed("field30", "genericRemarkCodeDate"
                ).withColumnRenamed("field31", "ratingRemarkCode"
                ).withColumnRenamed("field32", "ratingRemarkCodeDate"
                ).withColumnRenamed("field33", "mannerOfPayment"
                ).withColumnRenamed("field34", "paymentPattern"
                ).withColumnRenamed("field35", "maxDelMannerOfPayment"
                ).withColumnRenamed("field36", "maxDelRateDate"
                ).withColumnRenamed("field37", "maxDelPastDueAmount"
                ).withColumnRenamed("field38", "mostRecentMaxDelDate"
                ).withColumnRenamed("field39", "mostRecentMaxDelAmount"
                ).withColumnRenamed("field40", "paymentTypeCode"
                ).withColumnRenamed("field41", "balloonPaymentAmount"
                ).withColumnRenamed("field42", "balloonDueDate"
                ).withColumnRenamed("field43", "deferredPaymentStartDate"
                ).withColumnRenamed("field44", "portfolioSaleIndicatorCode"
                ).withColumnRenamed("field45", "creditorClassificiationCode"
                ).withColumnRenamed("field46", "prohibitedCode"
                ).withColumnRenamed("field47", "updateIndicator"
                ).withColumnRenamed("field48", "filler5"
                ).withColumnRenamed("subjectkey", "subjectKey"
                ).withColumnRenamed("field50", "tradeKey"
                ).withColumnRenamed("field51", "assetClassTag"
                ).withColumnRenamed("field52", "asOfDate"
                ).withColumnRenamed("field53", "lenderKey"
                )
            else:
                #00-08
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field2", "matchType"                 
                ).withColumnRenamed("field3", "sequenceNumber"
                ).withColumnRenamed("field4", "filler1"
                ).withColumnRenamed("field5", "subscriberCode"
                ).withColumnRenamed("field6", "openDate"
                ).withColumnRenamed("field7", "ecoaCode"
                ).withColumnRenamed("field8", "highCreditAmount"
                ).withColumnRenamed("field9", "effectiveDate"
                ).withColumnRenamed("field10", "reportedDate"
                ).withColumnRenamed("field11", "closedDate"  
                ).withColumnRenamed("field12", "paidOutDate"  
                ).withColumnRenamed("field13", "dateVerificationIndicator"  
                ).withColumnRenamed("field14", "paymentPattern"    
                ).withColumnRenamed("field15", "currentBalanceAmount"  
                ).withColumnRenamed("field16", "pastDueAmount"   
                ).withColumnRenamed("field17", "filler2"       
                ).withColumnRenamed("field18", "mannerOfPayment"  
                ).withColumnRenamed("field19", "creditLimitAmount"  
                ).withColumnRenamed("field20", "terms"           
                ).withColumnRenamed("field21", "maxDelRateDate"           
                ).withColumnRenamed("field22", "maxDelPastDueAmount"           
                ).withColumnRenamed("field23", "maxDelPastDueValue"  
                ).withColumnRenamed("field24", "paymentScheduleMonthCount"   
                ).withColumnRenamed("field25", "pastDue3059"        
                ).withColumnRenamed("field26", "pastDue6089"      
                ).withColumnRenamed("field27", "pastDue90"      
                ).withColumnRenamed("field28", "filler3"        
                ).withColumnRenamed("field29", "disputeCode"     
                ).withColumnRenamed("field30", "collateralCode"     
                ).withColumnRenamed("field31", "filler4"        
                ).withColumnRenamed("field32", "shortSubscriberName"        
                ).withColumnRenamed("field33", "filler5"        
                ).withColumnRenamed("field34", "filler6" 
                ).withColumnRenamed("subjectkey", "subjectKey"
                ).withColumnRenamed("field36", "tradeKey"
                ).withColumnRenamed("field37", "assetClassTag"
                ).withColumnRenamed("field38", "asOfDate"
                )
        elif table=='collections':
            if y>2008:
                #09-15 data:
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field2", "sequenceNumber"
                ).withColumnRenamed("field3", "industryCode"
                ).withColumnRenamed("field4", "filler1"
                ).withColumnRenamed("field5", "filler2"
                ).withColumnRenamed("field6", "ecoaCode"
                ).withColumnRenamed("field7", "openDate"
                ).withColumnRenamed("field8", "effectiveDate"
                ).withColumnRenamed("field9", "paidOutDate"
                ).withColumnRenamed("field10", "closedDate"
                ).withColumnRenamed("field11", "lastPaymentDate"
                ).withColumnRenamed("field12", "highCreditAmount"
                ).withColumnRenamed("field13", "currentBalanceAmount"
                ).withColumnRenamed("field14", "paymentAmount"
                ).withColumnRenamed("field15", "pastDueAmount"
                ).withColumnRenamed("field16", "portfolioTypeCode"
                ).withColumnRenamed("field17", "mannerOfPayment"
                ).withColumnRenamed("field18", "accountTypeCode"
                ).withColumnRenamed("field19", "affiliateRemarkCode"
                ).withColumnRenamed("field20", "affiliateRemarkCodeFirstReportDate"
                ).withColumnRenamed("field21", "complianceRemarkCode"
                ).withColumnRenamed("field22", "complianceRemarkCodeFirstReportDate"
                ).withColumnRenamed("field23", "genericRemarkCode"
                ).withColumnRenamed("field24", "genericRemarkCodeFirstReportDate"
                ).withColumnRenamed("field25", "ratingRemarkCode"
                ).withColumnRenamed("field26", "ratingRemarkCodeFirstReportDate"
                ).withColumnRenamed("field27", "creditorClassificiationCode"
                ).withColumnRenamed("field28", "portfolioSaleIndicatorCode"        
                ).withColumnRenamed("field29", "prohibitedCode"
                ).withColumnRenamed("field30", "updateIndicator"
                ).withColumnRenamed("field31", "filler3"
                ).withColumnRenamed("subjectkey", "subjectKey"
                ).withColumnRenamed("field33", "collectionKey"
                ).withColumnRenamed("field34", "asOfDate"
                )
            else:
                #00-08 data
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field2", "matchType"                 
                ).withColumnRenamed("field3", "sequenceNumber"
                ).withColumnRenamed("field4", "openDate"             #reportedDate in dictionary
                ).withColumnRenamed("field5", "filter1"
                ).withColumnRenamed("field6", "highCreditAmount"     #amountOwned in dictionary           
                ).withColumnRenamed("field7", "status"
                ).withColumnRenamed("field8", "paidOutDate"
                ).withColumnRenamed("field9", "ecoaCode"
                ).withColumnRenamed("field10", "filter2"
                ).withColumnRenamed("field11", "filter3"                
                ).withColumnRenamed("subjectkey", "subjectKey"
                ).withColumnRenamed("field13", "asOfDate"     
                )  
        elif table=='pubrecs':
            if y>2008:
                #09-15 data:
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field2", "sequenceNumber"
                ).withColumnRenamed("field3", "industryCode"
                ).withColumnRenamed("field4", "filler1"
                ).withColumnRenamed("field5", "publicRecordCode"
                ).withColumnRenamed("field6", "effectiveDate"
                ).withColumnRenamed("field7", "filedDate"
                ).withColumnRenamed("field8", "paidDate"
                ).withColumnRenamed("field9", "paymentAmount"
                ).withColumnRenamed("field10", "publicRecordSource"
                ).withColumnRenamed("field11", "ecoaCode"
                ).withColumnRenamed("field12", "prohibitedCode"
                ).withColumnRenamed("subjectkey", "subjectKey"
                ).withColumnRenamed("field14", "asOfDate"
                )
            else:
                #00-08 data
                df = df.withColumnRenamed("field1", "recordType"
                ).withColumnRenamed("field2", "matchType"                 
                ).withColumnRenamed("field3", "sequenceNumber"
                ).withColumnRenamed("field4", "filedDate"
                ).withColumnRenamed("field5", "amount"
                ).withColumnRenamed("field6", "publicRecordCode"
                ).withColumnRenamed("field7", "paidDate"
                ).withColumnRenamed("field8", "ecoaCode"
                ).withColumnRenamed("field9", "filter1"
                ).withColumnRenamed("field10", "filter2"
                ).withColumnRenamed("field11", "filter3"
                ).withColumnRenamed("field12", "filter4"
                ).withColumnRenamed("field13", "filter5"                
                ).withColumnRenamed("subjectkey", "subjectKey"
                ).withColumnRenamed("field15", "asOfDate"
                )
        else:
            print("Invalid table name !")         

    else:
        print("Invalid table name !")
    

    #return dataframe
    return df



####################################################################
# Helper functions for SQL codes
####################################################################


### get date according to YYYYMMDD format
def sql_date(y, m):
    #month end date
    d = calendar.monthrange(y, m)[1]
    if m<10:
        mstr = '0'+str(m)
    else:
        mstr = str(m)

    return str(y) + mstr + str(d)

### get date according to YYYYMM format
def yyyymm(y, m):
    if m<10:
        mstr = '0'+str(m)
    else:
        mstr = str(m)

    return str(y) + mstr

### get date of last quarter according to YYYYMMDD format
def lastQ(y, m, showD=True):
    #year and month of previous quarter
    if m<=3:
        lasty = y - 1
        lastm = 12 + m - 3
    else: 
        lasty = y
        lastm = m - 3

    #month end date
    d = calendar.monthrange(lasty, lastm)[1]
    if lastm<10:
        mstr = '0'+str(lastm)
    else:
        mstr = str(lastm)

    if showD:
        return str(lasty) + mstr + str(d)
    else:
        return str(lasty) + mstr   

### get date of last year according to YYYYMMDD format
def lastY(y, m, showD=True, last=1):

    lasty = y - last

    #month end date
    d = calendar.monthrange(lasty, m)[1]
    if m<10:
        mstr = '0'+str(m)
    else:
        mstr = str(m)

    if showD:
        return str(lasty) + mstr + str(d)
    else:
        return str(lasty) + mstr   

### quarter end months to loop through
def quarter_end_month(y, \
    QsStart = [12], QsEnd = [3,6,9], Qs = [3,6,9,12], \
    startY  = 2000, endY    = 2016):
    
    if y==startY:
        Qlist = QsStart
    elif y==endY:
        Qlist = QsEnd
    else:
        Qlist = Qs
    return Qlist



### spark save output (csv) in a folder with some auxilary files
# the following program cleans this up
# leave only the intended output csv
def cleanOutput(fn, outTab, outdir, workdir, sqlContext):

    sql2run = 'SELECT * FROM parquet.`' + os.path.join(workdir, outTab)   + '`'


    # repartition to consolidate into one file
    csvFn  = os.path.join(outdir, fn)
    result = sqlContext.sql(sql2run)
    result.coalesce(1).write.mode('overwrite').format("com.databricks.spark.csv").option("header", "true").save(csvFn)

    #check all files in the output folder
    files = os.listdir(csvFn)
    for f in files:    
        # only keep the csv
        if f.endswith('.csv'):
            #move csv output to the intended outdir
            shutil.move(os.path.join(csvFn, f), os.path.join(outdir, f)) 
            #assign intended name
            os.rename(os.path.join(outdir, f), os.path.join(outdir, fn + '.csv'))
    #remove output folder
    shutil.rmtree(csvFn)


# This function saves the data to .dta format to outdir, but you need to install 'pyarrow' first.
def saveStata(sql,name, workdir, outdir):
    sqlContext.sql(sql).write.mode('overwrite').parquet(os.path.join(workdir, name))
    df = pd.read_parquet(os.path.join(workdir, name), engine = 'pyarrow')
    df.to_stata(outdir + name +'.dta', write_index=False)
    del df
    print('*** '+name+' is saved ***')    


####################################################################
# Functions to construct financial distress measures
####################################################################

### extract key TU outcome variables from customers, trades, public for one snapshot
# the output is individual level data with financial distress measures
def data_snapshot(sqlContext, WORKDIR, y, m, CW='parquet.`/geo_debt/geo/cw/cw_zip_cz`', outTab='merge_temp', filterTab='', snapShotData=True):

    ym = yyyymm(y, m)

    #### last quarter date for public record
    if y>2008:
        #YYYYMMDD
        lastPubrecQ = lastQ(y, m)
        lastPubrecY = lastY(y, m)
        lastPubrecY3 = lastY(y, m, last=3)
        lastPubrecY7 = lastY(y, m, last=7)
    else:
        #YYYYMM
        lastPubrecQ = lastQ(y, m, showD=False)
        lastPubrecY = lastY(y, m, showD=False)
        lastPubrecY3 = lastY(y, m, showD=False, last=3)
        lastPubrecY7 = lastY(y, m, showD=False, last=7)

    #### define tables
    headersData = table_setup(sqlContext, y, m, 'headers')
    headersData.createOrReplaceTempView("headersData")

    pubrecsData = table_setup(sqlContext, y, m, 'pubrecs')
    pubrecsData.createOrReplaceTempView("pubrecsData")    

    tradesData  = table_setup(sqlContext, y, m, 'trades')
    tradesData.createOrReplaceTempView("tradesData")

    collectionsData = table_setup(sqlContext, y, m, 'collections')
    collectionsData.createOrReplaceTempView("collectionsData")

    customsData = table_setup(sqlContext, y, m, 'customs')
    customsData.createOrReplaceTempView("customsData")

    
    #### do we want to filter segment tables?
    # add this SQL syntax after WHERE
    if filterTab:
        filterFn  = 'parquet.`' + os.path.join(WORKDIR, filterTab) + '`'
        filterSQL = " subjectKey IN (SELECT DISTINCT a.subjectKey FROM " + filterFn + " a ) AND "
    else:
        filterSQL = ""


    #### trade table #### 
    # focus on credit card, count how many accounts are currently delinquent based on MOP
    # MOP 2: 30 DPD
    #     3: 60 DPD
    #     4: 90 DPD
    #     5: 120+ DPD
    #     7: wage earner (chapter 13)
    #     8: repossesion; 8A: voluntary surrender; 8P repossession paid in full
    #     9: chargedoff; 9B in collection; 9P paid chargeooff/collection
    sql_trade = "SELECT asOfDate, subjectKey, " + \
              "SUM ( CASE WHEN mannerOfPayment in ('02','03','04','05','07','08','09','9B',2,3,4,5,7,8,9)  THEN 1 ELSE 0 END ) AS cc_ncr, " + \
              "SUM ( CASE WHEN mannerOfPayment in ('02','03','04','05',                    2,3,4,5      )  THEN 1 ELSE 0 END ) AS cc_dlq " + \
              "FROM tradesData WHERE assetClassTag = 'CRED' GROUP BY asOfDate, subjectKey"


    #### collection table #### 
    if y>2008:
        colvar     = ' currentBalanceAmount '
        medCond    = ' AND prohibitedCode = 2 '
        nonMedCond = ' AND prohibitedCode = 1 '
        colCond    = " highCreditAmount >100 AND mannerOfPayment = '9B' " 
        colCond1Y  = " highCreditAmount >100 AND mannerOfPayment = '9B' AND (paidOutDate<=0 OR paidOutDate IS NULL) AND openDate > " + lastPubrecY
        colCond3Y  = " highCreditAmount >100 AND mannerOfPayment = '9B' AND (paidOutDate<=0 OR paidOutDate IS NULL) AND openDate > " + lastPubrecY3
        colCondNoDispute    = " highCreditAmount  >100 AND mannerOfPayment = '9B' AND complianceRemarkCode IS NULL " 
        
    else:
        colvar     = ' highCreditAmount '
        medCond    = ' '
        nonMedCond = ' '
        colCond    = " highCreditAmount >100 AND status = 'UP' " 
        colCond1Y  = " highCreditAmount >100 AND status = 'UP' AND (paidOutDate<=0 OR paidOutDate IS NULL) AND openDate > " + lastPubrecY
        colCond3Y  = " highCreditAmount >100 AND status = 'UP' AND (paidOutDate<=0 OR paidOutDate IS NULL) AND openDate > " + lastPubrecY3
        colCondNoDispute    = " highCreditAmount  >100 AND status = 'UP' " 


    sql_collection = "SELECT asOfDate, subjectKey, " + \
              "COUNT(subjectKey) AS col, " + \
              "SUM(CASE WHEN " + colCond              + " THEN 1 ELSE 0 END)              AS unpdcol,    " + \
              "SUM(CASE WHEN " + colCond1Y            + " THEN 1 ELSE 0 END)              AS unpdcoly1,  " + \
              "SUM(CASE WHEN " + colCond3Y            + " THEN 1 ELSE 0 END)              AS unpdcoly3,  " + \
              "SUM(CASE WHEN " + colCond              + " THEN " + colvar + " ELSE 0 END) AS colbal,     " + \
              "SUM(CASE WHEN " + colCond1Y            + " THEN " + colvar + " ELSE 0 END) AS colbaly1,   " + \
              "SUM(CASE WHEN " + colCond3Y            + " THEN " + colvar + " ELSE 0 END) AS colbaly3,   " + \
              "SUM(CASE WHEN " + colCond + medCond    + " THEN 1 ELSE 0 END)              AS colmed,     " + \
              "SUM(CASE WHEN " + colCond + medCond    + " THEN " + colvar + " ELSE 0 END) AS medbal,     " + \
              "SUM(CASE WHEN " + colCond + nonMedCond + " THEN 1 ELSE 0 END)              AS colexmed,   " + \
              "SUM(CASE WHEN " + colCond + nonMedCond + " THEN " + colvar + " ELSE 0 END) AS exmedbal    " + \
              "FROM collectionsData WHERE subjectKey IS NOT NULL GROUP BY asOfDate, subjectKey"


    #### public record ####
    # stock: currently in bankruptcy
    # flow: chapter 7, 13, any bankruptcy filing over the past year
    sql_pubrec = "SELECT asOfDate, subjectKey, "     + \
              "SUM(CASE WHEN  publicRecordCode IS NOT NULL         THEN 1 ELSE NULL END) AS bkrt,  " + \
              "SUM(CASE WHEN  publicRecordCode in ('7F','7D','7X') THEN 1 ELSE NULL END) AS bkrt7,  " + \
              "SUM(CASE WHEN  publicRecordCode in ('3F','3D','3X') THEN 1 ELSE NULL END) AS bkrt13, " + \
              "SUM(CASE WHEN (publicRecordCode IS NOT NULL         AND filedDate > " + lastPubrecY3 + ") THEN 1 ELSE NULL END) AS bkrty3,  " + \
              "SUM(CASE WHEN (publicRecordCode in ('7F','7D','7X') AND filedDate > " + lastPubrecY3 + ") THEN 1 ELSE NULL END) AS bkrt7y3, " + \
              "SUM(CASE WHEN (publicRecordCode in ('3F','3D','3X') AND filedDate > " + lastPubrecY3 + ") THEN 1 ELSE NULL END) AS bkrt13y3 " + \
              "FROM pubrecsData WHERE publicRecordCode in ('7F','1F','2F','3F','7D','1D','2D','3D','7X','1X','2X','3X') GROUP BY asOfDate, subjectKey"
    
    #### custom table #### 
    # pre 2008, we don't have medical collection from collection table, need this from custom
    # this variable is named differently pre/post 2008
    if y>2008:
        nonmedcoll = ' s068b AS colexmed '
        cc_mdlq    = ' CASE WHEN bc36s<0 OR bc36s IS NULL THEN 999 ELSE bc36s END AS monthdlq '
    else:
        nonmedcoll = ' colexmed '
        cc_mdlq    = ' CASE WHEN scrbc36<0 OR scrbc36 IS NULL THEN 999 ELSE scrbc36 END AS monthdlq '



    sql_custom = "SELECT asOfDate, subjectKey, zipcode, CASE WHEN score<=300 THEN NULL ELSE score END AS score, " + nonmedcoll + ", " + cc_mdlq + \
              "FROM customsData WHERE " + filterSQL + "  subjectKey IS NOT NULL"
    
    #different method to compute medical vs nonmedical collection
    # post 2008, collection table breaks medical vs non-medical, so we use that info
    # prior to 2008, we rely on collection excluding medical from custom data, and the rest is medical collection
    if y>2008:
        medNonMedCol = ' (CASE WHEN b.colmed>0    THEN 100  ELSE 0 END)  AS colmed, ' + \
                       ' (CASE WHEN b.colexmed>0  THEN 100  ELSE 0 END)  AS colexmed, '
    else:
        medNonMedCol = ' (CASE WHEN a.colexmed<=0 THEN 0 ELSE 100 END) AS colexmed, ' + \
                       ' (CASE WHEN (b.col<=(CASE WHEN a.colexmed<=0 THEN 0 ELSE a.colexmed  END) OR b.col IS NULL) THEN 0 ELSE 100 END) AS colmed, '


    sql2run = 'SELECT a.asOfDate, a.subjectKey, a.zipcode, a.score, a.monthdlq, ' + \
              '(CASE WHEN b.unpdcol  >0  THEN 100         ELSE 0 END)  AS unpdcol, ' + \
              '(CASE WHEN b.unpdcoly1>0  THEN 100         ELSE 0 END)  AS unpdcoly1, ' + \
              '(CASE WHEN b.unpdcoly3>0  THEN 100         ELSE 0 END)  AS unpdcoly3, ' + medNonMedCol + \
              '(CASE WHEN b.colbal   >0  THEN b.colbal    ELSE 0 END)  AS colbal,   ' + \
              '(CASE WHEN b.colbaly1 >0  THEN b.colbaly1  ELSE 0 END)  AS colbaly1, ' + \
              '(CASE WHEN b.colbaly3 >0  THEN b.colbaly3  ELSE 0 END)  AS colbaly3, ' + \
              '(CASE WHEN b.medbal   >0  THEN b.medbal    ELSE 0 END)  AS medbal,   ' + \
              '(CASE WHEN b.exmedbal >0  THEN b.exmedbal  ELSE 0 END)  AS exmedbal  ' + \
              'FROM (' + sql_custom + ') a LEFT JOIN (' + sql_collection + ') b ' + \
              'ON a.subjectKey = b.subjectKey AND a.asOfDate=b.asOfDate '
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, 'custom_temp')) 

    #### merge custom and trade (same process for both data regime)
    tempa = 'parquet.`' + os.path.join(WORKDIR, 'custom_temp') + '`'
 
    sql2run = "SELECT a.*, " + \
              "(CASE WHEN (b.cc_ncr   <= 0 OR b.cc_ncr  IS NULL)                   THEN 0 ELSE 100 END) AS ccdq,      " + \
              "(CASE WHEN (b.cc_dlq   <= 0 OR b.cc_dlq  IS NULL)                   THEN 0 ELSE 100 END) AS ccdq_fl,   " + \
              "(CASE WHEN (b.cc_ncr   <= 0 OR b.cc_ncr  IS NULL OR a.monthdlq>12)  THEN 0 ELSE 100 END) AS ccdqy1,    " + \
              "(CASE WHEN (b.cc_ncr   <= 0 OR b.cc_ncr  IS NULL OR a.monthdlq>36)  THEN 0 ELSE 100 END) AS ccdqy3,    " + \
              "(CASE WHEN  b.cc_ncr   is NULL THEN NULL ELSE (CASE WHEN b.cc_ncr  > 0                    THEN 100 ELSE 0 END) END) AS ccdq_cond,      " + \
              "(CASE WHEN  b.cc_dlq   is NULL THEN NULL ELSE (CASE WHEN b.cc_dlq  > 0                    THEN 100 ELSE 0 END) END) AS ccdq_fl_cond,   " + \
              "(CASE WHEN  b.cc_ncr   is NULL THEN NULL ELSE (CASE WHEN b.cc_ncr  > 0 AND a.monthdlq<=12 THEN 100 ELSE 0 END) END) AS ccdqy1_cond,    " + \
              "(CASE WHEN  b.cc_ncr   is NULL THEN NULL ELSE (CASE WHEN b.cc_ncr  > 0 AND a.monthdlq<=36 THEN 100 ELSE 0 END) END) AS ccdqy3_cond     " + \
              "FROM " + tempa + " a LEFT JOIN (" + sql_trade + ") b ON a.subjectKey = b.subjectKey AND a.asOfDate=b.asOfDate "
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, 'custom_temp2'))
    shutil.rmtree(os.path.join(WORKDIR, 'custom_temp')) 

    #### merge custom and public record
    # note we express bankruptcy rate as bankruptcy over 1000 individuals
    tempa = 'parquet.`' + os.path.join(WORKDIR, 'custom_temp2') + '`'
    sql2run = "SELECT a.*, " + \
              "CASE WHEN b.bkrt     IS NULL THEN 0 ELSE 100  END AS bkrt,     " + \
              "CASE WHEN b.bkrt7    IS NULL THEN 0 ELSE 100  END AS bkrt7,    " + \
              "CASE WHEN b.bkrt13   IS NULL THEN 0 ELSE 100  END AS bkrt13,   " + \
              "CASE WHEN b.bkrty3   IS NULL THEN 0 ELSE 1000 END AS bkrty3,   " + \
              "CASE WHEN b.bkrt7y3  IS NULL THEN 0 ELSE 1000 END AS bkrt7y3,  " + \
              "CASE WHEN b.bkrt13y3 IS NULL THEN 0 ELSE 1000 END AS bkrt13y3  " + \
              "FROM " + tempa + " a LEFT JOIN (" + sql_pubrec + ") b ON a.subjectKey = b.subjectKey AND a.asOfDate=b.asOfDate "

    result  = sqlContext.sql(sql2run)
    result.write.mode('overwrite').parquet(os.path.join(WORKDIR, outTab)) 
    print(' Merged Temp Table at ' + ym + '! row count: ' + str(result.count()))
    shutil.rmtree(os.path.join(WORKDIR, 'custom_temp2'))

    #clear cache
    sqlContext.clearCache()

    #### aggregate SQL syntax. Use right before FROM since the last row does not end with ','

    # different variables for snapshot vs mover data
    if snapShotData:
        aggSQL =  'COUNT(asOfDate)       AS numobs, '          + \
                  'AVG(score)            AS score,  '          + \
                  'AVG(unpdcol)          AS unpdcol,  '        + \
                  'AVG(unpdcoly1)        AS unpdcoly1,  '      + \
                  'AVG(unpdcoly3)        AS unpdcoly3,  '      + \
                  'AVG(colmed)           AS colmed, '          + \
                  'AVG(colexmed)         AS colexmed, '        + \
                  'AVG(colbal)           AS colbal, '          + \
                  'AVG(colbaly1)         AS colbaly1, '        + \
                  'AVG(colbaly3)         AS colbaly3, '        + \
                  'AVG(medbal)           AS medbal, '          + \
                  'AVG(exmedbal)         AS exmedbal, '        + \
                  'AVG(ccdq)             AS ccdq,   '          + \
                  'AVG(ccdq_cond)        AS ccdq_cond, '       + \
                  'AVG(ccdq_fl)          AS ccdq_fl, '         + \
                  'AVG(ccdq_fl_cond)     AS ccdq_fl_cond, '    + \
                  'AVG(ccdqy1)           AS ccdqy1,   '        + \
                  'AVG(ccdqy1_cond)      AS ccdqy1_cond, '     + \
                  'AVG(ccdqy3)           AS ccdqy3,   '        + \
                  'AVG(ccdqy3_cond)      AS ccdqy3_cond, '     + \
                  'AVG(bkrt)             AS bkrt,   '          + \
                  'AVG(bkrt7)            AS bkrt7,   '         + \
                  'AVG(bkrt13)           AS bkrt13,   '        + \
                  'AVG(bkrty3)           AS bkrty3,  '         + \
                  'AVG(bkrt7y3)          AS bkrt7y3,  '        + \
                  'AVG(bkrt13y3)         AS bkrt13y3  '       
    else:
        aggSQL =  'COUNT(asOfDate)       AS numobs, '          + \
                  'AVG(score)            AS score,  '          + \
                  'AVG(unpdcol)          AS unpdcol,  '        + \
                  'AVG(unpdcoly1)        AS unpdcoly1,  '      + \
                  'AVG(unpdcoly3)        AS unpdcoly3,  '      + \
                  'AVG(colmed)           AS colmed, '          + \
                  'AVG(colexmed)         AS colexmed, '        + \
                  'AVG(ccdq)             AS ccdq,   '          + \
                  'AVG(ccdq_cond)        AS ccdq_cond, '       + \
                  'AVG(ccdq_fl)          AS ccdq_fl,   '       + \
                  'AVG(ccdqy1)           AS ccdqy1,   '        + \
                  'AVG(ccdqy3)           AS ccdqy3,   '        + \
                  'AVG(bkrt)             AS bkrt, '            + \
                  'AVG(bkrt7y3)          AS bkrt7y3,  '        + \
                  'AVG(bkrt13y3)         AS bkrt13y3, '        + \
                  'AVG(bkrty3)           AS bkrty3   '
    return aggSQL








